Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New Connector] Jira connector - On Cloud and Server #527

Merged
merged 16 commits into from
Mar 24, 2023

Conversation

moxarth-elastic
Copy link
Collaborator

@moxarth-elastic moxarth-elastic commented Feb 21, 2023

PR Content

Jira connector with the support for both on-cloud and on-prem platforms.

  • Ping functionality with unit test cases.
  • Indexing functionality with configured connection string.
  • E2E test cases using Flask.

Checklists

  • Covered the changes with automated tests
  • Tested the changes locally
  • Add a module or a directory in connectors/sources
  • Implement a class that implements all methods described in connectors.source.BaseDataSource
  • Add a unit test in connectors/sources/tests with +90% coverage
  • Declare your connector in config.yml in the sources section
  • Declare your dependencies in requirements. Make sure you pin these dependencies
  • Make sure you use an async lib for your source. If not possible, make sure you don't block the loop
  • When possible, provide a docker image that runs the backend service, so we can test the connector. If you can't provide a docker image, provide the credentials needed to run against an online service.
  • The test backend needs to return more than 10k documents due to 10k being a default size limit for Elasticsearch pagination. Having more than 10k documents returned from the test backend will help the testing connector more deeply.
  • Added functional test to the nightly buildkite

New Dependency

  • New external service dependency added i.e. Flask to run E2E tests.

@moxarth-elastic moxarth-elastic marked this pull request as ready for review February 21, 2023 17:35
Comment on lines 335 to 336
if not self.ssl_disabled:
self.ssl_ctx = self._ssl_context(certificate=self.certificate)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not happen in ping - it's not used in this function and get_docs depends on it. You should initialise it in such a way, that both ping and get_docs could work if called individually, not sequentially.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood your concern here. But, the connector is designed in such a way that ping will always gets executed first and then the get_docs method. So, there would not be a chance of the get_docs method to be called individually since the framework itself is not allowing it. However, once we have a conclusion on this comment, we'll update the code accordingly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not designed this way, it works this way now because it was the most convenient way to do. It might and will change in future, so we want to build connectors that are resilient to changes.

In short, you should treat ping and get_docs() as methods that can be called in any sequence OR even on their own, all these 3 cases are valid:

  1. ping(); get_docs()
  2. ping(); ping()
  3. get_docs()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! got it. However this is related to this comment so do you want me to change this before getting the conclusion on it, or should wait for it? Till the time, i'll address other comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to address the topic in the comment in the mean time, I'll try to get back to you asap

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, please. Do let me know your opinion on this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this to both the places i.e. ping and get_docs



@pytest.mark.asyncio
async def test_get_docs():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally you should test multiple cases with different data types returned here, it's your main function that you implement and it has only single test for a very basic scenario, can you make sure you test the method properly?

Copy link
Collaborator Author

@moxarth-elastic moxarth-elastic Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful if you provide some more idea which kind of test you want for this method? Since this test is covering multiple data types including project, issue and attachment.

moxarth-elastic and others added 2 commits February 22, 2023 21:43
Co-authored-by: Artem Shelkovnikov <lavatroublebubble@gmail.com>
Co-authored-by: Artem Shelkovnikov <lavatroublebubble@gmail.com>
@moxarth-elastic
Copy link
Collaborator Author

@tarekziade did you get a chance to review this PR? If you've any feedbacks, I will address them with the Artem's one.

connectors/sources/jira.py Show resolved Hide resolved
Comment on lines +201 to +204
basic_auth = aiohttp.BasicAuth(
login=auth[0],
password=auth[1],
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this still work if self.is_cloud is True?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will work for both cloud and on-prem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to confirm, if self.is_cloud is True, you are trying to use service_account_id and api_token to build a basic auth.

yield item

await self.fetchers.join()
await asyncio.gather(project_task, attachment_task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line needed? If you exit the infinite loop above, does it mean both project_task and attachment_task are done?

Copy link
Collaborator Author

@moxarth-elastic moxarth-elastic Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this line runs the producers. Once, the await is finished, both the project_task and attachment_task are completed. The consumer loop is independent from this gather. The responsibility of this loop is to consume the documents added by the producer tasks. This consumer loop will only end once it gets the required number of FINISHED in the queue which are eventually added by the producer tasks themselves. Hence, in a way, consumer and producer are communicating via FINISHED signal.

@moxarth-elastic
Copy link
Collaborator Author

buildkite test this

Copy link
Contributor

@timgrein timgrein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the same pattern as in the confluence connector encapsulating and memoizing the client (instance).

f"Configured concurrent downloads can't be set more than {MAX_CONCURRENT_DOWNLOADS}."
)

def _generate_session(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really like to see the same pattern we've used for the Confluence connector (see: https://github.com/elastic/connectors-python/pull/568/files), i.e. creating a JiraClient class, which encapsulates the interaction with the server/cloud instance. Makes the connectors way more consistent.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, however the testing cycle has been completed and it would take quite some efforts from the QA end to re-test it with new changes, so I strongly recommend we raise a GitHub issue and cover it as an enhancement PR. Let me know your thoughts regarding the same.

Copy link
Member

@wangch079 wangch079 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had concerns about how the get_docs is implemented.

It implements a ConcurrentTasks object fetchers to download attachments:
https://github.com/elastic/connectors-python/blob/1c1d6a747221581ba23a713470540a5759cd2527/connectors/sources/jira.py#L88

But I think this is not necessary. Connector implementation should just yield a lazy_download (i.e. a callback), and the framewrok will handle the download at framework level:
https://github.com/elastic/connectors-python/blob/1c1d6a747221581ba23a713470540a5759cd2527/connectors/byoei.py#L298-L304

So I think what the get_docs should do is just yield project, issue, and attachment.

I also think it's overkill to use a queue as well. Having dedicated coroutine for project and issues, not a big deal, but I don't feel it's necessary. cc @tarekziade

@tarekziade
Copy link
Contributor

tarekziade commented Mar 21, 2023

I had concerns about how the get_docs is implemented.

I'll do a full review now

I also think it's overkill to use a queue as well. Having dedicated coroutine for project and issues, not a big deal, but I don't feel it's necessary. cc @tarekziade

ConcurrentTasks is just a way to group async tasks together, I don't think it's a problem to do it this way if this source class can boost how fast it's able to browse content from the 3rd party that way and stream them in a queue in a controlled throttled way. Here it's a nice way to browse different kinds of objects in parallel, that all end up being docs (projects and issues).

This technique speeds up the time taken by the connector to grab content.

tarekziade
tarekziade previously approved these changes Mar 21, 2023
Copy link
Contributor

@tarekziade tarekziade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a few nits. LGTM otherwise

connectors/sources/jira.py Show resolved Hide resolved
URLS = {
PING: "/rest/api/2/myself",
PROJECT: "/rest/api/2/project?expand=description,lead,url",
ISSUES: "/rest/api/2/search?maxResults={maxResults}&startAt={startAt}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pep8 like others, so :

/rest/api/2/search?maxResults={max_results}&startAt={start_at}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

FETCH_SIZE = 100
CHUNK_SIZE = 1024
QUEUE_SIZE = 1024
QUEUE_MEM_SIZE = 5 # Size in Megabytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's MiB, add 1024*1024 here

self.session = None
self.tasks = 0
self.queue = MemQueue(
maxsize=QUEUE_SIZE, maxmemsize=QUEUE_MEM_SIZE * 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my previous comment about MiB

should_paginate = True
while should_paginate:
async for response in self._api_call(
url_name=url_name, startAt=start_at, maxResults=FETCH_SIZE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pep8


attachment_name = attachment["filename"]
if os.path.splitext(attachment_name)[-1] not in TIKA_SUPPORTED_FILETYPES:
logger.debug(f"{attachment_name} is not supported by TIKA, skipping")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.warning ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can put a warning logger instead.

"_timestamp": attachment["created"],
}
temp_filename = ""
attachment_url = ATTACHMENT_CLOUD if self.is_cloud else ATTACHMENT_SERVER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @artem-shelkovnikov this is where #534 would be useful

async with aiofiles.open(file=temp_filename, mode="r") as async_buffer:
# base64 on macOS will add a EOL, so we strip() here
document["_attachment"] = (await async_buffer.read()).strip()
await remove(temp_filename)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's try except this with just a warning if remove failed

attachments=attachments, issue_key=issue_key
):
await self.queue.put(
( # pyright: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does pyright says here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error we're getting here -

image

document_task = asyncio.create_task(_document_task())
self.tasks += 1

# Consumer block to grab items from queue in a loop and yield one at a time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@moxarth-elastic
Copy link
Collaborator Author

moxarth-elastic commented Mar 22, 2023

ConcurrentTasks is just a way to group async tasks together, I don't think it's a problem to do it this way if this source class can boost how fast it's able to browse content from the 3rd party that way and stream them in a queue in a controlled throttled way. Here it's a nice way to browse different kinds of objects in parallel, that all end up being docs (projects and issues).

FYI @tarekziade , after adding concurrency using ConcurrentTasks, we faced QueueFull error several times and so the connector would get terminated. So, can we add a debug or warning logger to show the QueueFull message instead of raising it?

@tarekziade
Copy link
Contributor

tarekziade commented Mar 24, 2023

ConcurrentTasks is just a way to group async tasks together, I don't think it's a problem to do it this way if this source class can boost how fast it's able to browse content from the 3rd party that way and stream them in a queue in a controlled throttled way. Here it's a nice way to browse different kinds of objects in parallel, that all end up being docs (projects and issues).

FYI @tarekziade , after adding concurrency using ConcurrentTasks, we faced QueueFull error several times and so the connector would get terminated. So, can we add a debug or warning logger to show the QueueFull message instead of raising it?

The way it works:

So if you are raising it means you are filing the queue super fast and the rest of the pipeline can't follow.

Try this:

self.queue = MemQueue(maxsize=0, maxmemsize=10*1024*1024, refresh_timeout=120)

(no need to limit the number of items to 1024, I noticed that you added that limit, you probably hit this first)

Copy link
Contributor

@tarekziade tarekziade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a few nits to address

url_name=url_name, start_at=start_at, max_results=FETCH_SIZE
):
response_json = await response.json()
total = response_json.get("total")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if total does not exists you get None and the next test will fail.

if you always get total and it's an int, change this to:

total = response_json["total"]

if not, make that code more robust:

total = int(response_json.get("total", 0))

logger.debug(f"Calling convert_to_b64 for file : {attachment_name}")
await asyncio.to_thread(convert_to_b64, source=temp_filename)
async with aiofiles.open(file=temp_filename, mode="r") as async_buffer:
# base64 on macOS will add a EOL, so we strip() here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add an issue about this so we have the same behavior for convert_to_b64 on all platform

yield {
"_id": f"{project['name']}-{project['id']}",
"_timestamp": iso_utc(
when=datetime.now(pytz.timezone(self.timezone))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you feed when ? it looks like the default value as it's done in iso_utc

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, we're indexing project every time with _timestamp with iso_utc and other jira documents are using the timestamp getting from the response. So, if Jira server is hosted in different timezone other than UTC, user will face timestamp mismatch for projects and other objects in term of timezone. So, keep it consitent, we're feeding when to the iso_utc of the timezone of Jira server.

partial(
self.get_content,
issue_key=issue_key,
attachment=copy(attachment),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why copy here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, attachment is a dictionary so any changes in the partial should not be reflected in the below code. Hence, we passed the copy to avoid changes done by the partial in the passed param.

certificate = certificate.replace(" ", "\n")
certificate = " ".join(certificate.split("\n", 1))
certificate = " ".join(certificate.rsplit("\n", 1))
certificate = get_pem_format(certificate, max_split=1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@tarekziade tarekziade self-requested a review March 24, 2023 05:20
tarekziade
tarekziade previously approved these changes Mar 24, 2023
Copy link
Contributor

@tarekziade tarekziade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved, pending one more approval since it's a big patch, +my nits to address

Copy link
Contributor

@timgrein timgrein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@timgrein timgrein merged commit ee2c4a1 into elastic:main Mar 24, 2023
@github-actions
Copy link

💔 Failed to create backport PR(s)

Status Branch Result
8.8 The branch "8.8" is invalid or doesn't exist

To backport manually run:
backport --pr 527 --autoMerge --autoMergeMethod squash

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants